/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.processors.kafka.pubsub;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;

@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.0"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.0 Producer API. "
    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    + "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_0.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
    description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
    + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
    + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
    expressionLanguageScope = VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
    + "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_2_0.class, ConsumeKafka_2_0.class, ConsumeKafkaRecord_2_0.class})
public class PublishKafkaRecord_2_0 extends AbstractProcessor {
    protected static final String MSG_COUNT = "msg.count";

    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
        "FlowFile will be routed to failure unless the message is replicated to the appropriate "
            + "number of Kafka Nodes according to the Topic configuration");
    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
        "FlowFile will be routed to success if the message is received by a single Kafka node, "
            + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
            + "but can result in data loss if a Kafka node crashes");
    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
        "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
            + "without waiting for a response. This provides the best performance but may result in data loss.");

    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
        Partitioners.RoundRobinPartitioner.class.getSimpleName(),
        "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
            + "the next Partition to Partition 2, and so on, wrapping as necessary.");
    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
        "DefaultPartitioner", "Messages will be assigned to random partitions.");
    static final AllowableValue RECORD_PATH_PARTITIONING = new AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
        "RecordPath Partitioner", "Interprets the <Partition> property as a RecordPath that will be evaluated against each Record to determine which partition the Record will go to. All Records " +
        "that have the same value for the given RecordPath will go to the same Partition.");
    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), "Expression Language Partitioner",
        "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
            "so all Records in a given FlowFile will go to the same partition.");


    static final PropertyDescriptor TOPIC = new Builder()
        .name("topic")
        .displayName("Topic Name")
        .description("The name of the Kafka Topic to publish to.")
        .required(true)
        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
        .build();

    static final PropertyDescriptor RECORD_READER = new Builder()
        .name("record-reader")
        .displayName("Record Reader")
        .description("The Record Reader to use for incoming FlowFiles")
        .identifiesControllerService(RecordReaderFactory.class)
        .expressionLanguageSupported(NONE)
        .required(true)
        .build();

    static final PropertyDescriptor RECORD_WRITER = new Builder()
        .name("record-writer")
        .displayName("Record Writer")
        .description("The Record Writer to use in order to serialize the data before sending to Kafka")
        .identifiesControllerService(RecordSetWriterFactory.class)
        .expressionLanguageSupported(NONE)
        .required(true)
        .build();

    static final PropertyDescriptor MESSAGE_KEY_FIELD = new Builder()
        .name("message-key-field")
        .displayName("Message Key Field")
        .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
        .required(false)
        .build();

    static final PropertyDescriptor DELIVERY_GUARANTEE = new Builder()
        .name("acks")
        .displayName("Delivery Guarantee")
        .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
        .required(true)
        .expressionLanguageSupported(NONE)
        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
        .build();

    static final PropertyDescriptor METADATA_WAIT_TIME = new Builder()
        .name("max.block.ms")
        .displayName("Max Metadata Wait Time")
        .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
        .required(true)
        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
        .expressionLanguageSupported(VARIABLE_REGISTRY)
        .defaultValue("5 sec")
        .build();

    static final PropertyDescriptor ACK_WAIT_TIME = new Builder()
        .name("ack.wait.time")
        .displayName("Acknowledgment Wait Time")
        .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
            + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
        .expressionLanguageSupported(NONE)
        .required(true)
        .defaultValue("5 secs")
        .build();

    static final PropertyDescriptor MAX_REQUEST_SIZE = new Builder()
        .name("max.request.size")
        .displayName("Max Request Size")
        .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
        .required(true)
        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
        .defaultValue("1 MB")
        .build();

    static final PropertyDescriptor PARTITION_CLASS = new Builder()
        .name("partitioner.class")
        .displayName("Partitioner class")
        .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
        .defaultValue(RANDOM_PARTITIONING.getValue())
        .required(false)
        .build();

    static final PropertyDescriptor PARTITION = new Builder()
        .name("partition")
        .displayName("Partition")
        .description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.")
        .required(false)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
        .build();

    static final PropertyDescriptor COMPRESSION_CODEC = new Builder()
        .name("compression.type")
        .displayName("Compression Type")
        .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
        .required(true)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .allowableValues("none", "gzip", "snappy", "lz4")
        .defaultValue("none")
        .build();

    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new Builder()
        .name("attribute-name-regex")
        .displayName("Attributes to Send as Headers (Regex)")
        .description("A Regular Expression that is matched against all FlowFile attribute names. "
            + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
            + "If not specified, no FlowFile attributes will be added as headers.")
        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
        .expressionLanguageSupported(NONE)
        .required(false)
        .build();
    static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
        .name("use-transactions")
        .displayName("Use Transactions")
        .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
            + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
            + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
            + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
        .expressionLanguageSupported(NONE)
        .allowableValues("true", "false")
        .defaultValue("true")
        .required(true)
        .build();
    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new Builder()
        .name("transactional-id-prefix")
        .displayName("Transactional Id Prefix")
        .description("When Use Transaction is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.")
        .expressionLanguageSupported(VARIABLE_REGISTRY)
        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
        .required(false)
        .build();
    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
        .name("message-header-encoding")
        .displayName("Message Header Encoding")
        .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
            + "this property indicates the Character Encoding to use for serializing the headers.")
        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
        .defaultValue("UTF-8")
        .required(false)
        .build();

    static final Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description("FlowFiles for which all content was sent to Kafka.")
        .build();

    static final Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
        .build();

    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;

    private volatile PublisherPool publisherPool = null;
    private final RecordPathCache recordPathCache = new RecordPathCache(25);

    static {
        final List<PropertyDescriptor> properties = new ArrayList<>();
        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
        properties.add(TOPIC);
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(USE_TRANSACTIONS);
        properties.add(TRANSACTIONAL_ID_PREFIX);
        properties.add(DELIVERY_GUARANTEE);
        properties.add(ATTRIBUTE_NAME_REGEX);
        properties.add(MESSAGE_HEADER_ENCODING);
        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
        properties.add(KafkaProcessorUtils.SASL_MECHANISM);
        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
        properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
        properties.add(KafkaProcessorUtils.USER_KEYTAB);
        properties.add(KafkaProcessorUtils.USERNAME);
        properties.add(KafkaProcessorUtils.PASSWORD);
        properties.add(KafkaProcessorUtils.TOKEN_AUTH);
        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
        properties.add(MESSAGE_KEY_FIELD);
        properties.add(MAX_REQUEST_SIZE);
        properties.add(ACK_WAIT_TIME);
        properties.add(METADATA_WAIT_TIME);
        properties.add(PARTITION_CLASS);
        properties.add(PARTITION);
        properties.add(COMPRESSION_CODEC);

        PROPERTIES = Collections.unmodifiableList(properties);

        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
        return new Builder()
            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
            .name(propertyDescriptorName)
            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
            .dynamic(true)
            .expressionLanguageSupported(VARIABLE_REGISTRY)
            .build();
    }

    @Override
    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
        final List<ValidationResult> results = new ArrayList<>();
        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));

        final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
        if (useTransactions) {
            final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
            if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
                results.add(new ValidationResult.Builder()
                    .subject("Delivery Guarantee")
                    .valid(false)
                    .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" "
                        + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.")
                    .build());
            }
        }

        final String partitionClass = validationContext.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
            final String rawRecordPath = validationContext.getProperty(PARTITION).getValue();
            if (rawRecordPath == null) {
                results.add(new ValidationResult.Builder()
                    .subject("Partition")
                    .valid(false)
                    .explanation("The <Partition> property must be specified if using the RecordPath Partitioning class")
                    .build());
            } else if (!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
                final ValidationResult result = new RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath, validationContext);
                if (result != null) {
                    results.add(result);
                }
            }
        } else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
            final String rawRecordPath = validationContext.getProperty(PARTITION).getValue();
            if (rawRecordPath == null) {
                results.add(new ValidationResult.Builder()
                    .subject("Partition")
                    .valid(false)
                    .explanation("The <Partition> property must be specified if using the Expression Language Partitioning class")
                    .build());
            }
        }

        return results;
    }

    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
        PublisherPool pool = publisherPool;
        if (pool != null) {
            return pool;
        }

        return publisherPool = createPublisherPool(context);
    }

    protected PublisherPool createPublisherPool(final ProcessContext context) {
        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();

        final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
        final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
        final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
        final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
        Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);

        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
        final Charset charset = Charset.forName(charsetName);

        final Map<String, Object> kafkaProperties = new HashMap<>();
        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));

        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, transactionalIdSupplier, attributeNamePattern, charset);
    }

    @OnStopped
    public void closePool() {
        if (publisherPool != null) {
            publisherPool.close();
        }

        publisherPool = null;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
        if (flowFiles.isEmpty()) {
            return;
        }

        final PublisherPool pool = getPublisherPool(context);
        if (pool == null) {
            context.yield();
            return;
        }

        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();

        final long startTime = System.nanoTime();
        try (final PublisherLease lease = pool.obtainPublisher()) {
            try {
                if (useTransactions) {
                    lease.beginTransaction();
                }

                // Send each FlowFile to Kafka asynchronously.
                final Iterator<FlowFile> itr = flowFiles.iterator();
                while (itr.hasNext()) {
                    final FlowFile flowFile = itr.next();

                    if (!isScheduled()) {
                        // If stopped, re-queue FlowFile instead of sending it
                        if (useTransactions) {
                            session.rollback();
                            lease.rollback();
                            return;
                        }

                        session.transfer(flowFile);
                        itr.remove();
                        continue;
                    }

                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                    final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();

                    final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);

                    try {
                        session.read(flowFile, new InputStreamCallback() {
                            @Override
                            public void process(final InputStream in) throws IOException {
                                try {
                                    final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
                                    final RecordSet recordSet = reader.createRecordSet();

                                    final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
                                    lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
                                } catch (final SchemaNotFoundException | MalformedRecordException e) {
                                    throw new ProcessException(e);
                                }
                            }
                        });
                    } catch (final Exception e) {
                        // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
                        lease.fail(flowFile, e);
                        continue;
                    }
                }

                // Complete the send
                final PublishResult publishResult = lease.complete();

                if (publishResult.isFailure()) {
                    getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
                    session.transfer(flowFiles, REL_FAILURE);
                    return;
                }

                // Transfer any successful FlowFiles.
                final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
                for (FlowFile success : flowFiles) {
                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();

                    final int msgCount = publishResult.getSuccessfulMessageCount(success);
                    success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                    session.adjustCounter("Messages Sent", msgCount, true);

                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
                    session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                    session.transfer(success, REL_SUCCESS);
                }
            } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                lease.poison();
                getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
                session.transfer(flowFiles, REL_FAILURE);
                context.yield();
            }
        }
    }

    private Function<Record, Integer> getPartitioner(final ProcessContext context, final FlowFile flowFile) {
        final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
            final String recordPath = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
            final RecordPath compiled = recordPathCache.getCompiled(recordPath);

            return record -> evaluateRecordPath(compiled, record);
        } else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
            final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
            final int hash = Objects.hashCode(partition);
            return (record) -> hash;
        }

        return null;
    }

    private Integer evaluateRecordPath(final RecordPath recordPath, final Record record) {
        final RecordPathResult result = recordPath.evaluate(record);
        final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);

        result.getSelectedFields().forEach(fieldValue -> {
            final Object value = fieldValue.getValue();
            final long hash = Objects.hashCode(value);
            accumulator.accumulate(hash);
        });

        return accumulator.intValue();
    }
}
